データオーケストレーションツールのDagsterを使ってみた
大阪オフィスの玉井です。
dbt界隈の人たちがこぞって推奨している(ように思える)ツールであるDagsterを使ってみました。
Dasterとは?
公式の紹介文を引用します。
Dagster is a data orchestrator. It lets you define pipelines (DAGs) in terms of the data flow between logical components called solids. These pipelines can be developed locally and run anywhere.
「データオーケストレーター」と言われると、なかなかピンときませんが、ジョブ管理ツールの一種と思っていただければわかりやすいと思います(Apache Airflow等と同じカテゴリ)。データパイプラインの開発はもちろん、一連の処理の運用まで行えるツールになっています。
基本的な使い方をやってみた
検証環境
- macOS Catalina 10.15.7
- Dagster 0.10.9
- Python 3.9.2
- pip 21.0.1
Dagsterをインストールする
インストールはpipで行います。
$ pip install dagster dagit
dagster
というのは、その名の通り、ツール本体です。dagit
というのは、Dagsterを操作できるUI等になります。
1つの処理を動かす
まずは、こちらのチュートリアルをやってみます。
まず、処理する元データを入手しておきます。
$ curl -O https://raw.githubusercontent.com/dagster-io/dagster/master/examples/docs_snippets/docs_snippets/intro_tutorial/cereal.csv
そして、いきなりですが、下記をDagsterで動かしてみます。こちらはDagsterのチュートリアルのコードです(一部日本語に変えています)。
import csv import os from dagster import execute_pipeline, pipeline, solid @solid def hello_cereal(context): # データセットがこのファイルと同じディレクトリにあると仮定 dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv") with open(dataset_path, "r") as fd: # 標準のcsvライブラリを使用して行を読み込む cereals = [row for row in csv.DictReader(fd)] context.log.info( "シリアルに関するデータは{n_cereals}件あります。".format(n_cereals=len(cereals)) ) return cereals @pipeline def hello_cereal_pipeline(): hello_cereal()
Dagsterの特徴的な要素として、solid
があります。一連のワークフローにおいて、中の一つ一つの処理を、Dagsterではsolid
という単位で定義します。めちゃくちゃ雑にいうと、基本、1処理1solidみたいな感じですね。
そして、作った処理はpipeline
でつなげていきます。ただ、このコードでは、solid
は1つしかないので、そのsolid
を普通に呼び出すだけになっています。
作成した一連の処理の実行方法には色々ありますが、今回はDagsterをUIで操作できるdagit
を使ってみます。dagitを起動するには、下記のコマンドを実行します。
$ dagit -f hello_cereal.py ... Loading repository... Serving on http://127.0.0.1:3000 in process xxxxx
ローカル内にdagitが立ち上がるので、表示されたIPを、Webブラウザで接続すると、DagsterのUIが表示されます。
画面には、定義されているsolid
が表示されます。今回のsolid
は1つだけなので、ぽつんと1つだけ存在しています。solid
を選ぶと、処理の詳細情報を確認することができます。
処理の実行は、PlaygroundというメニューからLanch Executionというボタンを選択して行うことが出来ます。
Lanch Executionを選択すると、solid
として定義した処理が実際に実行されました。上記のコードを見てもらえばわかる通り、このsolid
は、同ディレクトリに存在するcsvファイルを読み込み、件数がメッセージとともに出力されるという内容になります。処理結果は画面下部にログ形式で表示されます。
依存関係のある2つの処理を動かす
先程は単体のsolid
一つだけでしたが、今度は2つのsolid
を組み合わせてみます。最初のsolidでデータを取り出し、そのデータを2つめのsolidで処理する…という流れです。
import csv import os from dagster import execute_pipeline, pipeline, solid @solid def load_cereals(context): csv_path = os.path.join(os.path.dirname(__file__), "cereal.csv") with open(csv_path, "r") as fd: cereals = [row for row in csv.DictReader(fd)] context.log.info(f"シリアルに関するデータは {len(cereals)} 件あります。".format()) return cereals @solid def sort_by_calories(context, cereals): sorted_cereals = list( sorted(cereals, key=lambda cereal: cereal["calories"]) ) context.log.info(f'一番カロリーが高いのは {sorted_cereals[-1]["name"]} です。') @pipeline def serial_pipeline(): sort_by_calories(load_cereals())
※コードは公式チュートリアルのもので、メッセージ等だけ日本語に変えてます。
load_cereals
でcsvファイルを読み込み、データ部分だけを出力します。そして、sort_by_calories
でそのデータを受け取り、caloriesというカラムでデータを並べ替え、一番上にきたデータをメッセージとして出力します。
sort_by_calories
は、load_cereals
の返り値があることが前提の処理となっています。言い換えれば、sort_by_calories
はload_cereals
に依存していることになります。データパイプライン等の、データの一連の処理というのは、往々にして、各処理がこのような依存関係になっています。
各々の処理(solid
)に対する依存関係は、Dagsterではpipeline
という部分で表現します。例えば、今回の処理は、sort_by_calories(load_cereals())
と記述します。
dagitで読みこむと、下記のようになります。依存関係が視覚的に表現されていますね。
実際に実行してみました。結果も正しく出力されています。
並行処理
今度は、2つのsolid
を並行処理するパターンをやってみます。チュートリアルのコードはこちら(一部だけ日本語に変えてます)。
import csv import os from dagster import execute_pipeline, pipeline, solid @solid def load_cereals(_): dataset_path = os.path.join(os.path.dirname(__file__), "cereal.csv") with open(dataset_path, "r") as fd: cereals = [row for row in csv.DictReader(fd)] return cereals @solid def sort_by_calories(_, cereals): sorted_cereals = list( sorted(cereals, key=lambda cereal: cereal["calories"]) ) most_calories = sorted_cereals[-1]["name"] return most_calories @solid def sort_by_protein(_, cereals): sorted_cereals = list( sorted(cereals, key=lambda cereal: cereal["protein"]) ) most_protein = sorted_cereals[-1]["name"] return most_protein @solid def display_results(context, most_calories, most_protein): context.log.info(f"一番カロリーが高いのは {most_calories} です") context.log.info(f"一番プロテインが含まれているのは {most_protein} です") @pipeline def complex_pipeline(): cereals = load_cereals() display_results( most_calories=sort_by_calories(cereals), most_protein=sort_by_protein(cereals), )
ここらへんまでくると、とりあえずdagitで見たほうが処理の流れがわかりやすいので、先に見てみましょう。
csvファイルを読み込んだ後(load_cereals
)、先程のカロリー別のソートに加えて、もう一つのソート処理が増えています(sort_by_protein
)。これは、sort_by_calories
と並行して処理されるようになっています。そしてこの2つの処理の返り値を受け取って、最後にdisplay_results
が実行されるようになっています。
2つのソート処理は、両方ともload_cereals
に依存しています。しかし、ソート処理同士は依存していません。ですので、この2つの処理は並行して実行されるというわけです。最後のdisplay_results
は、2つのソート処理に依存しています。これをpipeline
で表現すると、上記のようになります。
実行結果は下記の通り。
おわりに
Dagsterは「データオーケストレーションツール」ということで、作成した処理をどういう風に運用するか等の観点での機能も色々あります。また、処理の開発方法についても(solidや他の概念など)、今回はほんの基本レベルしか触れられていません。さらに、本番運用する場合は、別途本番環境にデプロイする必要がありますが、その方法についても色々あります。
Dagsterについても、色々試してみては、ブログに残していきたいですね。